In [1]:
import numpy as np
import pandas as pd

# sklearn
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_moons, make_circles, make_classification
from sklearn.metrics import mean_squared_error, mean_absolute_error

# Pytorch
import torch
from torch.autograd import Variable
import torch.nn as nn 
import torchvision.transforms as transforms

# Visualisation libraries

## progressbar
import progressbar

## matplotlib
import matplotlib.pyplot as plt
%matplotlib inline

## seaborn
import seaborn as sns
sns.set_context("paper", rc={"font.size":12,"axes.titlesize":14,"axes.labelsize":12})
sns.set_style("white")

## plotly
from plotly.offline import init_notebook_mode, iplot 
import plotly.graph_objs as go
import plotly.offline as py
from plotly.subplots import make_subplots
import plotly.express as px
%config InlineBackend.figure_format = 'retina' 

import warnings
warnings.filterwarnings("ignore")
PyTorch Logistic Regression

Dataset

a random n-class classification dataset can be generated using sklearn.datasets.make_classification. Here, we generate a dataset with two features and 6000 instances. Moreover, the dataset is generated for multiclass classification with five classes.

In [2]:
X, y = make_classification(n_samples = int(6e3), n_features=2, n_redundant=0, n_classes = 4,
                           n_informative=2, random_state=1, n_clusters_per_class=1)
Labels = np.unique(y)
Labels = [str(x) for x in Labels]

Modeling

Train and Test sets

One of the efficient methods of splitting a dataset into random train and test subsets is using sklearn.model_selection.train_test_split.

In [3]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

pd.DataFrame(data={'Set':['X_train','X_test','y_train','y_test'],
               'Shape':[X_train.shape, X_test.shape, y_train.shape, y_test.shape]}).set_index('Set').T
Out[3]:
Set X_train X_test y_train y_test
Shape (4200, 2) (1800, 2) (4200,) (1800,)
In [4]:
# Train and Test Tests

if torch.cuda.is_available():
    X_train_tensor = Variable(torch.from_numpy(X_train).cuda())
    y_train_tensor = Variable(torch.from_numpy(y_train).type(torch.LongTensor).cuda())
    X_test_tensor = Variable(torch.from_numpy(X_test).cuda())
    y_test_tensor = Variable(torch.from_numpy(y_test).type(torch.LongTensor).cuda())
else:
    X_train_tensor = Variable(torch.from_numpy(X_train))
    y_train_tensor = Variable(torch.from_numpy(y_train).type(torch.LongTensor))
    X_test_tensor = Variable(torch.from_numpy(X_test))
    y_test_tensor = Variable(torch.from_numpy(y_test).type(torch.LongTensor))
    

Batch_size = 100
iteration_number = 1e3

epochs_number = int(iteration_number / (len(X_train) / Batch_size))

# Pytorch train and test sets
Train_set = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor)
Test_set = torch.utils.data.TensorDataset(X_test_tensor, y_test_tensor)

# data loader
train_loader = torch.utils.data.DataLoader(Train_set, batch_size = Batch_size, shuffle = False)
test_loader = torch.utils.data.DataLoader(Train_set, batch_size = Batch_size, shuffle = False)

Logistic Regression

We have talked about the Logistic Regression using sklearn. Here, we would like to try the logistic regression iteratively using an optimization algorithm. The algorithm at each iteration uses the Cross-Entropy Loss to measure the loss, and then the gradient and the model update is calculated. At the end of this iterative process, we would reach a better level of agreement between test and predicted sets since the error would be lower from that of the first step.

In [5]:
class LogisticRegressionModel(torch.nn.Module):
    def __init__(self, input_Size, output_Size):
        super(LogisticRegressionModel, self).__init__()
        self.linear = torch.nn.Linear(input_Size, output_Size)
    
    def forward(self, x):
        out = self.linear(x)
        return out
In [6]:
input_Size, output_Size = len(X[0]), len(np.unique(y))

# model
model = LogisticRegressionModel(input_Size, output_Size)

# GPU
if torch.cuda.is_available():
    model.cuda()

# Cross Entropy Loss 
CEL= nn.CrossEntropyLoss()

# Optimizer 
learning_rate = 1e-2
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

# Traning the Model
Count = 0
Loss_list = []
Iteration_list = []
Accuracy_list = []
MSE_list = []
MAE_list = []
Steps = 10

Progress_Bar = progressbar.ProgressBar(maxval= iteration_number + 200,
                                       widgets=[progressbar.Bar('=', '|', '|'),
                                                progressbar.Percentage()])
# print('---------------------------------------------------------')
for epoch in range(epochs_number):
    for i, (Xtr, ytr) in enumerate(train_loader):
        
        # Variables
        Xtr = Variable(Xtr.view(-1, X[0].shape[0]))
        ytr = Variable(ytr)
        
        # Set all gradients to zero
        optimizer.zero_grad()
        
        # Forward
        Out = model(Xtr.float())
        
        # loss
        loss = CEL(Out, ytr.long())
        
        # Backward (Calculating the gradients)
        loss.backward()
        
        # Update parameters
        optimizer.step()
        
        Count += 1
        
        del Xtr, ytr
        
        # Predictions
        if Count % Steps == 0:
            # Calculate Accuracy         
            Correct, Total = 0, 0
            # Predictions
            for Xts, yts in test_loader: 
                Xts = Variable(Xts.view(-1, X[0].shape[0]))
                
                # Forward
                Out = model(Xts.float())
                
                # The maximum value of Out
                Predicted = torch.max(Out.data, 1)[1]
                
                # Total number of yts
                Total += len(yts)
                
                # Total Correct predictions
                Correct += (Predicted == yts).sum()
            del Xts, yts
            # storing loss and iteration
            Loss_list.append(loss.data)
            Iteration_list.append(Count)
            Accuracy_list.append(Correct / float(Total))
            
        Progress_Bar.update(Count)

Progress_Bar.finish()

history = pd.DataFrame({'Iteration': np.array(Iteration_list),
                      'Loss': np.array([x.cpu().data.numpy() for x in Loss_list]),
                      'Accuracy': np.array([x.cpu().data.numpy() for x in Accuracy_list])})
del Loss_list, Iteration_list, Accuracy_list
|=========================================================================|100%

Let's define some function by which we can analyze the performance of the modeling.

In [7]:
def Plot_history(history, Table_Rows = 25, yLim = 2):
    fig = make_subplots(rows=1, cols=2, horizontal_spacing = 0.02, column_widths=[0.6, 0.4],
                        specs=[[{"type": "scatter"},{"type": "table"}]])
    # Left
    fig.add_trace(go.Scatter(x= history['Iteration'].values, y= history['Loss'].astype(float).values.round(4),
                             line=dict(color='OrangeRed', width= 1.5), name = 'Loss'), 1, 1)
    fig.add_trace(go.Scatter(x= history['Iteration'].values, y= history['Accuracy'].astype(float).values,
                             line=dict(color='MidnightBlue', width= 1.5),  name = 'Accuracy'), 1, 1)
    fig.update_layout(legend=dict(x=0, y=1.1, traceorder='reversed', font_size=12),
                  dragmode='select', plot_bgcolor= 'white', height=600, hovermode='closest',
                  legend_orientation='h')
    fig.update_xaxes(range=[history.Iteration.min(), history.Iteration.max()],
                     showgrid=True, gridwidth=1, gridcolor='Lightgray',
                     showline=True, linewidth=1, linecolor='Lightgray', mirror=True, row=1, col=1)
    fig.update_yaxes(range=[0, yLim], showgrid=True, gridwidth=1, gridcolor='Lightgray',
                     showline=True, linewidth=1, linecolor='Lightgray', mirror=True, row=1, col=1)
    # Right
    ind = np.linspace(0, history.shape[0], Table_Rows, endpoint = False).round(0).astype(int)
    ind = np.append(ind, history.Iteration.values[-1])
    history = history[history.index.isin(ind)]
    fig.add_trace(go.Table(header=dict(values = list(history.columns), line_color='darkslategray',
                                       fill_color='DimGray', align=['center','center'],
                                       font=dict(color='white', size=12), height=25), columnwidth = [0.4, 0.4, 0.4, 0.4],
                           cells=dict(values=[history.Iteration, history.Loss.astype(float).round(4).values,
                                          history.Accuracy.astype(float).round(4).values],
                                      line_color='darkslategray', fill=dict(color=['WhiteSmoke', 'white']),
                                      align=['center', 'center'], font_size=12,height=20)), 1, 2)
    fig.show()
    
def Confusion_Matrix(Model, X, y, Labels, FG = (14, 5)):
    fig, ax = plt.subplots(1, 2, figsize=FG)
    # Predictions
    y_pred = model(X_test_tensor.float())
    y_pred = torch.max(y_pred.data, 1)[1]
    y_pred = y_pred.cpu().data.numpy()
    # confusion matrix
    CM = confusion_matrix(y_test, y_pred)


    _ = sns.heatmap(CM.round(2), annot=True, annot_kws={"size": 14}, cmap="Blues", ax = ax[0])
    _ = ax[0].set_xlabel('Predicted labels')
    _ = ax[0].set_ylabel('True labels'); 
    _ = ax[0].set_title('Confusion Matrix');
    _ = ax[0].xaxis.set_ticklabels(Labels)
    _ = ax[0].yaxis.set_ticklabels(Labels)

    CM = CM.astype('float') / CM.sum(axis=1)[:, np.newaxis]
    _ = sns.heatmap(CM.round(2), annot=True, annot_kws={"size": 14}, cmap="Greens", ax = ax[1],
                   linewidths = 0.2, vmin=0, vmax=1, cbar_kws={"shrink": 1})
    _ = ax[1].set_xlabel('Predicted labels')
    _ = ax[1].set_ylabel('True labels'); 
    _ = ax[1].set_title('Normalized Confusion Matrix');
    _ = ax[1].xaxis.set_ticklabels(Labels)
    _ = ax[1].yaxis.set_ticklabels(Labels)
    return fig, ax

def Plot_Classification(Model, X, y, Labels, BP = .5, Alpha=0.6, ax = False, fs = 7, ColorMap =  'Spectral'):
    h=0.02
    pad=0.25
    # adding margins
    x_min, x_max = X[:, 0].min()-pad, X[:, 0].max()+pad
    y_min, y_max = X[:, 1].min()-pad, X[:, 1].max()+pad
    # Generating meshgrids
    xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))
    
    Temp = np.c_[xx.ravel(), yy.ravel()]
    if torch.cuda.is_available():
        Temp = Variable(torch.from_numpy(Temp).cuda())
    else:
        Temp = Variable(torch.from_numpy(Temp))
    # Predictions
    Pred = Model(Temp.float())
    Pred = torch.max(Pred.data, 1)[1]
    Pred = Pred.cpu().data.numpy()
    Pred = Pred.reshape(xx.shape)
    # Figure
    if ax == False:
        fig, ax = plt.subplots(1, 1, figsize=(fs, fs))
    _ = ax.contourf(xx, yy, Pred, cmap = ColorMap, alpha=0.2)
    
    scatter = ax.scatter(X[:,0], X[:,1], s=70, c=y, edgecolor = 'Navy', alpha = Alpha, cmap = ColorMap)
    _ = ax.legend(handles=scatter.legend_elements()[0], labels= Labels,
                  fancybox=True, framealpha=1, shadow=True, borderpad=BP, loc='best', fontsize = 14)
    _ = ax.set_xlim(x_min, x_max)
    _ = ax.set_ylim(y_min, y_max)
    _ = ax.set_xlabel(r'$X_1$')
    _ = ax.set_ylabel(r'$X_2$')

Model Optimization Plot

In [8]:
Plot_history(history)

Confusion Matrix

The confusion matrix allows for visualization of the performance of an algorithm.

In [9]:
# Train Set
fig, _ = Confusion_Matrix(model, X_train, y_train, Labels)
_ = fig.suptitle('Train Set', fontsize = 16)
# Test Set
fig, _ = Confusion_Matrix(model, X_test, y_test, Labels)
_ = fig.suptitle('Test Set', fontsize = 16)

Plot Classification

In [10]:
fig, ax = plt.subplots(1, 2, figsize=(16, 7))
# Train Set
Plot_Classification(model, X_train, y_train, Labels, ax = ax[0])
_ = ax[0].set_title('Train Set', fontsize = 16)
# Test Set
Plot_Classification(model, X_test, y_test, Labels, ax = ax[1])
_ = ax[1].set_title('Test Set', fontsize = 16)